{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Taking Full Advantage of Parallelism With Data Distribution\n",
    "_**Using Amazon SageMaker's Managed, Distributed Training with Different Data Distribution Methods**_\n",
    "\n",
    "---\n",
    "\n",
    "---\n",
    "\n",
    "## Contents\n",
    "\n",
    "1. [Background](#Background)\n",
    "1. [Setup](#Setup)\n",
    "1. [Data](#Data)\n",
    "  1. [Scaling](#Scaling)\n",
    "1. [Train](#Train)\n",
    "  1. [Timing](#Timing)\n",
    "1. [Host](#Host)\n",
    "  1. [Evaluate](#Evaluate)\n",
    "1. [Extensions](#Extensions)\n",
    "\n",
    "\n",
    "## Background\n",
    "\n",
    "Amazon SageMaker makes it easy to train machine learning models across a large number of machines.  This a non-trivial process, but Amazon SageMaker Algorithms and pre-built MXNet and TensorFlow containers hide most of the complexity from you.  Nevertheless, there are decisions on how a user structures their data which will have an implication on how the distributed training is carried out.  This notebook will walk through details on setting up your data to take full advantage of distributed processing.\n",
    "\n",
    "---\n",
    "# Setup\n",
    "\n",
    "_This notebook was created and tested on an ml.m4.xlarge notebook instance._\n",
    "\n",
    "Let's start by specifying:\n",
    "\n",
    "- The S3 bucket and prefix that you want to use for training and model data.  This should be within the same region as the Notebook Instance, training, and hosting.\n",
    "- The IAM role arn used to give training and hosting access to your data. See the documentation for how to create these.  Note, if more than one role is required for notebook instances, training, and/or hosting, please replace the boto regexp with a the appropriate full IAM role arn string(s)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true,
    "isConfigCell": true
   },
   "outputs": [],
   "source": [
    "# S3 bucket and prefix\n",
    "bucket = '<your_s3_bucket_name_here>'\n",
    "prefix = 'sagemaker/DEMO-data-distribution-types'\n",
    "\n",
    "# Define IAM role\n",
    "import boto3\n",
    "import re\n",
    "from sagemaker import get_execution_role\n",
    "\n",
    "role = get_execution_role()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next, we'll import the Python libraries we'll need for the remainder of the exercise."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import boto3\n",
    "import pandas as pd\n",
    "import numpy as np\n",
    "import matplotlib.pyplot as plt\n",
    "from IPython.display import display\n",
    "import io\n",
    "import time\n",
    "import copy\n",
    "import json\n",
    "import sys\n",
    "import sagemaker.amazon.common as smac\n",
    "import os"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---\n",
    "\n",
    "## Data\n",
    "\n",
    "The [dataset](https://aws.amazon.com/public-datasets/gdelt/) we'll use for this notebook is from the [Global Database of Events, Language and Tone (GDELT) Project](https://www.gdeltproject.org/).  This information is freely available on S3 as part of the [AWS Public Datasets](https://aws.amazon.com/public-datasets/) program.\n",
    "\n",
    "The data are stored as multiples files on S3, with two different formats: historical, which covers from 1979 to 2013, and daily updates which covers from 2013 on.  For this example, we'll stick to historical.  Let's bring in 1979 data for the purpose of interactive exploration.  We'll write a simple function so that later we can use it to download multiple files."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def get_gdelt(filename):\n",
    "    s3 = boto3.resource('s3')\n",
    "    s3.Bucket('gdelt-open-data').download_file('events/' + filename, '.gdelt.csv')\n",
    "    df = pd.read_csv('.gdelt.csv', sep='\\t')\n",
    "    header = pd.read_csv('https://www.gdeltproject.org/data/lookups/CSV.header.historical.txt', sep='\\t')\n",
    "    df.columns = header.columns\n",
    "    return df"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "data = get_gdelt('1979.csv')\n",
    "data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "As we can see, there are 57 columns, some of which are sparsely populated, cryptically named, and in a format that's not particularly friendly for machine learning.  So, for our use case, we'll strip down to a few core attributes.  We'll use:\n",
    "\n",
    "- `EventCode`: This is the raw CAMEO action code describing the action that Actor1 performed upon Actor2.  More detail can be found [here](https://www.gdeltproject.org/data/documentation/CAMEO.Manual.1.1b3.pdf)\n",
    "- `NumArticles`:  This is the total number of source documents containing one or more mentions of this event. This can be used as a method of assessing the “importance” of an event; the more discussion of that event, the more likely it is to be significant\n",
    "- `AvgTone`: This is the average “tone” of all documents containing one or more mentions of this event. The score ranges from -100 (extremely negative) to +100 (extremely positive). Common values range between -10 and +10, with 0 indicating neutral.\n",
    "- `Actor1Geo_Lat`: This is the centroid latitude of the Actor1 landmark for mapping.\n",
    "- `Actor1Geo_Long`: This is the centroid longitude of the Actor1 landmark for mapping.\n",
    "- `Actor2Geo_Lat`: This is the centroid latitude of the Actor2 landmark for mapping.\n",
    "- `Actor2Geo_Long`: This is the centroid longitude of the Actor2 landmark for mapping."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "data = data[['EventCode', 'NumArticles', 'AvgTone', 'Actor1Geo_Lat', 'Actor1Geo_Long', 'Actor2Geo_Lat', 'Actor2Geo_Long']]\n",
    "data['EventCode'] = data['EventCode'].astype(object)\n",
    "\n",
    "for column in data.select_dtypes(include=['object']).columns:\n",
    "    display(pd.crosstab(index=data[column], columns='% observations', normalize='columns'))\n",
    "\n",
    "display(data.describe())\n",
    "hist = data.hist(bins=30, sharey=True, figsize=(10, 10))\n",
    "plt.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can see:\n",
    "- `EventCode` is pretty unevenly distributed, with some events making up 7%+ of the observations and others being a thousandth of a percent.\n",
    "- `AvgTone` seems to be reasonably smoothly distributed, while `NumArticles` has a long tail, and `Actor` geo features have suspiciously large spikes near 0.\n",
    "\n",
    "Let's remove the (0, 0) lat-longs, one hot encode `EventCode`, and prepare our data for a machine learning model.  For this example we'll keep things straightforward and try to predict `AvgTone`, using the other variables in our dataset as features.\n",
    "\n",
    "One more issue remains.  As we noticed above, some occurrences of `EventCode` are very rare, and may be unlikely to occur in every single year.  This means if we one hot encode individual years at a time, our feature matrix may change shape over the years, which will not work.  Therefore, we'll limit all years to the most common `EventCodes` from the year we current have.  Let's get this list."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "events = pd.crosstab(index=data['EventCode'], columns='count').sort_values(by='count', ascending=False).index[:20]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Scaling\n",
    "\n",
    "Now that we've explored our data and are ready to prepare for modeling, we can start developing a few simple functions to help us scale this to GDELT datasets from other years."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def write_to_s3(bucket, prefix, channel, file_prefix, X, y):\n",
    "    buf = io.BytesIO()\n",
    "    smac.write_numpy_to_dense_tensor(buf, X.astype('float32'), y.astype('float32'))\n",
    "    buf.seek(0)\n",
    "    boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, channel, file_prefix + '.data')).upload_fileobj(buf)\n",
    "\n",
    "def transform_gdelt(df, events=None):\n",
    "    df = df[['AvgTone', 'EventCode', 'NumArticles', 'Actor1Geo_Lat', 'Actor1Geo_Long', 'Actor2Geo_Lat', 'Actor2Geo_Long']]\n",
    "    df['EventCode'] = df['EventCode'].astype(object)\n",
    "    if events is not None:\n",
    "        df = df[np.in1d(df['EventCode'], events)]\n",
    "    return pd.get_dummies(df[((df['Actor1Geo_Lat'] == 0) & (df['Actor1Geo_Long'] == 0) != True) &\n",
    "                                   ((df['Actor2Geo_Lat'] == 0) & (df['Actor2Geo_Long'] == 0) != True)])\n",
    "    \n",
    "def prepare_gdelt(bucket, prefix, file_prefix, events=None, random_state=1729):\n",
    "    df = get_gdelt(file_prefix + '.csv')\n",
    "    model_data = transform_gdelt(df, events)\n",
    "    train_data, validation_data = np.split(model_data.sample(frac=1, random_state=random_state).as_matrix(), \n",
    "                                           [int(0.9 * len(model_data))])\n",
    "    write_to_s3(bucket, prefix, 'train', file_prefix, train_data[:, 1:], train_data[:, 0])\n",
    "    write_to_s3(bucket, prefix, 'validation', file_prefix, validation_data[:, 1:], validation_data[:, 0])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "for year in range(1979, 1984):\n",
    "    prepare_gdelt(bucket, prefix, str(year), events)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---\n",
    "\n",
    "## Train\n",
    "\n",
    "Now that we have our data in S3, we can begin training.  We'll use Amazon SageMaker's linear regression algorithm, and will actually fit two models in order to properly compare data distribution types:\n",
    "1. In the first job, we'll use FullyReplicated for our `train` channel.  This will pass every file in our input S3 location to every machine (in this case we're using 5 machines). \n",
    "1. While in the second job, we'll use ShardedByS3Key for the `train` channel (note that we'll keep `FullyReplicated` for the validation channel.  So, for the training data, we'll pass each S3 object to a separate machine.  Since we have 5 files (one for each year), we'll train on 5 machines, meaning each machine will get a year's worth of records.\n",
    "\n",
    "First let's setup a list of training parameters which are common across the two jobs."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from sagemaker.amazon.amazon_estimator import get_image_uri\n",
    "container = get_image_uri(boto3.Session().region_name, 'linear-learner')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "common_training_params = {\n",
    "    \"RoleArn\": role,\n",
    "    \"AlgorithmSpecification\": {\n",
    "        \"TrainingImage\": container,\n",
    "        \"TrainingInputMode\": \"File\"\n",
    "    },\n",
    "    \"ResourceConfig\": {\n",
    "        \"InstanceCount\": 5,\n",
    "        \"InstanceType\": \"ml.c4.2xlarge\",\n",
    "        \"VolumeSizeInGB\": 10\n",
    "    },\n",
    "    \"InputDataConfig\": [\n",
    "        {\n",
    "            \"ChannelName\": \"train\",\n",
    "            \"DataSource\": {\n",
    "                \"S3DataSource\": {\n",
    "                    \"S3DataType\": \"S3Prefix\",\n",
    "                    \"S3Uri\": \"s3://{}/{}/train/\".format(bucket, prefix)\n",
    "                }\n",
    "            },\n",
    "            \"CompressionType\": \"None\",\n",
    "            \"RecordWrapperType\": \"None\"\n",
    "        },\n",
    "        {\n",
    "            \"ChannelName\": \"validation\",\n",
    "            \"DataSource\": {\n",
    "                \"S3DataSource\": {\n",
    "                    \"S3DataType\": \"S3Prefix\",\n",
    "                    \"S3Uri\": \"s3://{}/{}/validation/\".format(bucket, prefix),\n",
    "                    \"S3DataDistributionType\": \"FullyReplicated\"\n",
    "                }\n",
    "            },\n",
    "            \"CompressionType\": \"None\",\n",
    "            \"RecordWrapperType\": \"None\"\n",
    "        }\n",
    "\n",
    "    ],\n",
    "    \"OutputDataConfig\": {\n",
    "        \"S3OutputPath\": \"s3://{}/{}/\".format(bucket, prefix)\n",
    "    },\n",
    "    \"HyperParameters\": {\n",
    "        \"feature_dim\": \"25\",\n",
    "        \"mini_batch_size\": \"500\",\n",
    "        \"predictor_type\": \"regressor\",\n",
    "        \"epochs\": \"2\",\n",
    "        \"num_models\": \"32\",\n",
    "        \"loss\": \"absolute_loss\"\n",
    "    },\n",
    "    \"StoppingCondition\": {\n",
    "        \"MaxRuntimeInSeconds\": 60 * 60\n",
    "    }\n",
    "}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now we'll create two separate jobs, updating the parameters that are unique to each."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "sharded_job = 'DEMO-linear-sharded-' + time.strftime(\"%Y-%m-%d-%H-%M-%S\", time.gmtime())\n",
    "\n",
    "print(\"Job name is:\", sharded_job)\n",
    "\n",
    "sharded_training_params = copy.deepcopy(common_training_params)\n",
    "sharded_training_params['TrainingJobName'] = sharded_job\n",
    "sharded_training_params['InputDataConfig'][0]['DataSource']['S3DataSource']['S3DataDistributionType'] = 'ShardedByS3Key'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "replicated_job = 'DEMO-linear-replicated-' + time.strftime(\"%Y-%m-%d-%H-%M-%S\", time.gmtime())\n",
    "\n",
    "print(\"Job name is:\", replicated_job)\n",
    "\n",
    "replicated_training_params = copy.deepcopy(common_training_params)\n",
    "replicated_training_params['TrainingJobName'] = replicated_job\n",
    "replicated_training_params['InputDataConfig'][0]['DataSource']['S3DataSource']['S3DataDistributionType'] = 'FullyReplicated'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's submit these jobs, taking note that the first will be submitted to run in the background so that we can immediately run the second in parallel.  We'll need to place the waiter on the FullyReplicated jobs as we expect this to finish second because of the additional data loading time."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "region = boto3.Session().region_name\n",
    "sm = boto3.Session().client('sagemaker')\n",
    "\n",
    "sm.create_training_job(**sharded_training_params)\n",
    "sm.create_training_job(**replicated_training_params)\n",
    "\n",
    "status = sm.describe_training_job(TrainingJobName=replicated_job)['TrainingJobStatus']\n",
    "print(status)\n",
    "sm.get_waiter('training_job_completed_or_stopped').wait(TrainingJobName=replicated_job)\n",
    "status = sm.describe_training_job(TrainingJobName=replicated_job)['TrainingJobStatus']\n",
    "print(\"Training job ended with status: \" + status)\n",
    "if status == 'Failed':\n",
    "    message = sm.describe_training_job(TrainingJobName=replicated_job)['FailureReason']\n",
    "    print('Training failed with the following error: {}'.format(message))\n",
    "    raise Exception('Training job failed')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's confirm both jobs have finished."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "print('Sharded:', sm.describe_training_job(TrainingJobName=sharded_job)['TrainingJobStatus'])\n",
    "print('Replicated:', sm.describe_training_job(TrainingJobName=replicated_job)['TrainingJobStatus'])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Timing\n",
    "\n",
    "Let's compare how long it took to train a model with the two different distribution types.  To do this, we'll compare take the information from CloudWatch logs using the following function.\n",
    "\n",
    "_Notice, that these will exclude the time it took to setup hardware and load containers._"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def get_train_timing(job):\n",
    "    client = boto3.client('logs')\n",
    "    streams = client.describe_log_streams(logGroupName='/aws/sagemaker/TrainingJobs', \n",
    "                                          logStreamNamePrefix=job)\n",
    "    streams = [s['logStreamName'] for s in streams['logStreams']]\n",
    "    times = []\n",
    "    for stream in streams:\n",
    "        events = client.get_log_events(logGroupName='/aws/sagemaker/TrainingJobs', \n",
    "                                       logStreamName=stream)['events']\n",
    "        times += [e['timestamp'] for e in events]\n",
    "    return (max(times) - min(times)) / 60000."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "print('Sharded:', get_train_timing(sharded_job), 'minutes')\n",
    "print('Replicated:', get_train_timing(replicated_job), 'minutes')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "As we can see, and might expect, the sharded distribution type trained almost 4 times as fast.  This is a key differentiator to consider when preparing data and picking the distribution type."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---\n",
    "\n",
    "## Host\n",
    "\n",
    "Now that we've trained our machine learning models, we'll want to make predictions.  So, we'll setup a hosted endpoint for them.  The first step in doing that is to point our hosting service to the model.  We will:\n",
    "1. Point to the model.tar.gz that came from training\n",
    "1. Create the hosting model\n",
    "\n",
    "_Note, we'll do these twice, once for the model on replicated data and once for the model on distributed data._"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "sharded_model_response = sm.create_model(\n",
    "    ModelName=sharded_job,\n",
    "    ExecutionRoleArn=role,\n",
    "    PrimaryContainer={\n",
    "        'Image': container,\n",
    "        'ModelDataUrl': sm.describe_training_job(TrainingJobName=sharded_job)['ModelArtifacts']['S3ModelArtifacts']})\n",
    "\n",
    "print(sharded_model_response['ModelArn'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "replicated_model_response = sm.create_model(\n",
    "    ModelName=replicated_job,\n",
    "    ExecutionRoleArn=role,\n",
    "    PrimaryContainer={\n",
    "        'Image': container,\n",
    "        'ModelDataUrl': sm.describe_training_job(TrainingJobName=replicated_job)['ModelArtifacts']['S3ModelArtifacts']})\n",
    "\n",
    "print(replicated_model_response['ModelArn'])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Once we've setup our models, we can configure what our hosting endpoints should be.  Here we specify:\n",
    "1. EC2 instance type to use for hosting\n",
    "1. Initial number of instances\n",
    "1. Our hosting model name\n",
    "\n",
    "Again, we'll do this twice, once for each model."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "sharded_endpoint_config = 'DEMO-sharded-endpoint-config-' + time.strftime(\"%Y-%m-%d-%H-%M-%S\", time.gmtime())\n",
    "print(sharded_endpoint_config)\n",
    "sharded_endpoint_config_response = sm.create_endpoint_config(\n",
    "    EndpointConfigName=sharded_endpoint_config,\n",
    "    ProductionVariants=[{\n",
    "        'InstanceType': 'ml.m4.xlarge',\n",
    "        'InitialInstanceCount': 1,\n",
    "        'ModelName': sharded_job,\n",
    "        'VariantName': 'AllTraffic'}])\n",
    "\n",
    "print(\"Endpoint Config Arn: \" + sharded_endpoint_config_response['EndpointConfigArn'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "replicated_endpoint_config = 'DEMO-replicated-endpoint-config-' + time.strftime(\"%Y-%m-%d-%H-%M-%S\", time.gmtime())\n",
    "print(replicated_endpoint_config)\n",
    "replicated_endpoint_config_response = sm.create_endpoint_config(\n",
    "    EndpointConfigName=replicated_endpoint_config,\n",
    "    ProductionVariants=[{\n",
    "        'InstanceType': 'ml.m4.xlarge',\n",
    "        'InitialInstanceCount': 1,\n",
    "        'ModelName': replicated_job,\n",
    "        'VariantName': 'AllTraffic'}])\n",
    "\n",
    "print(\"Endpoint Config Arn: \" + replicated_endpoint_config_response['EndpointConfigArn'])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now that we've specified how our endpoints should be configured, we can create them.  This can be done in the background, so, we'll kick off one in the background but setup a waiter on the second endpoint creation so that we know when they are ready for use."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "sharded_endpoint = 'DEMO-sharded-endpoint-' + time.strftime(\"%Y%m%d%H%M\", time.gmtime())\n",
    "print(sharded_endpoint)\n",
    "sharded_endpoint_response = sm.create_endpoint(\n",
    "    EndpointName=sharded_endpoint,\n",
    "    EndpointConfigName=sharded_endpoint_config)\n",
    "print(sharded_endpoint_response['EndpointArn'])\n",
    "\n",
    "replicated_endpoint = 'DEMO-replicated-endpoint-' + time.strftime(\"%Y%m%d%H%M\", time.gmtime())\n",
    "print(replicated_endpoint)\n",
    "replicated_endpoint_response = sm.create_endpoint(\n",
    "    EndpointName=replicated_endpoint,\n",
    "    EndpointConfigName=replicated_endpoint_config)\n",
    "print(replicated_endpoint_response['EndpointArn'])\n",
    "\n",
    "resp = sm.describe_endpoint(EndpointName=replicated_endpoint)\n",
    "status = resp['EndpointStatus']\n",
    "print(\"Status: \" + status)\n",
    "\n",
    "sm.get_waiter('endpoint_in_service').wait(EndpointName=replicated_endpoint)\n",
    "\n",
    "resp = sm.describe_endpoint(EndpointName=replicated_endpoint)\n",
    "status = resp['EndpointStatus']\n",
    "print(\"Arn: \" + resp['EndpointArn'])\n",
    "print(\"Status: \" + status)\n",
    "\n",
    "if status != 'InService':\n",
    "    raise Exception('Endpoint creation did not succeed')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's confirm both are ready for use."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "print('Sharded:', sm.describe_endpoint(EndpointName=sharded_endpoint)['EndpointStatus'])\n",
    "print('Replicated:', sm.describe_endpoint(EndpointName=replicated_endpoint)['EndpointStatus'])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Evaluate\n",
    "\n",
    "To compare predictions from our two models, let's bring in some new data from a year the model was not trained or validated on."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "test_data = transform_gdelt(get_gdelt('1984.csv'), events).as_matrix()\n",
    "test_X = test_data[:, 1:]\n",
    "test_y = test_data[:, 0]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now we'll need a function to convert these numpy matrices to CSVs so they can be passed to our endpoint as an HTTP POST request."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def np2csv(arr):\n",
    "    csv = io.BytesIO()\n",
    "    np.savetxt(csv, arr, delimiter=',', fmt='%g')\n",
    "    return csv.getvalue().decode().rstrip()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next, because POST requests to our endpoint are limited to ~6MB, we'll setup a small function to split our test data up into mini-batches that are each about 5MB, loop through and invoke our endpoint to get predictions for those mini-batches, and gather them into a single array."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def predict_batches(data, endpoint):\n",
    "    rows = 5. * 1024. * 1024. / sys.getsizeof(np2csv(data[0, :]))\n",
    "    split_array = np.array_split(data, int(data.shape[0] / float(rows) + 1))\n",
    "    predictions = []\n",
    "    runtime = boto3.Session().client('runtime.sagemaker')\n",
    "    for array in split_array:\n",
    "        payload = np2csv(array)\n",
    "        response = runtime.invoke_endpoint(EndpointName=endpoint,\n",
    "                                           ContentType='text/csv',\n",
    "                                           Body=payload)\n",
    "        result = json.loads(response['Body'].read().decode())\n",
    "        predictions += [r['score'] for r in result['predictions']]\n",
    "    return np.array(predictions)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now we'll compare accuracy in mean squared error (MSE)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "sharded_predictions = predict_batches(test_X, sharded_endpoint)\n",
    "replicated_predictions = predict_batches(test_X, replicated_endpoint)\n",
    "\n",
    "print('Sharded MSE =', np.mean((test_y - sharded_predictions) ** 2))\n",
    "print('Replicated MSE =', np.mean((test_y - replicated_predictions) ** 2))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can see that the fully replicated distribution type performs just slightly better in terms of fit.  However, this difference is small compared to the overall speedup that providing multiple S3 objects and distributing them across machines provides."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---\n",
    "\n",
    "## Extensions\n",
    "\n",
    "This notebook ran a regression on a relatively artificial example, and we skipped some pre-processing steps along the way (like potentially transforming or winsorizing our target variable, looking for interations in our features, etc.).  But the main point was to highlight the difference in training time and accuracy of a linear model trained through two different distribution methods.\n",
    "\n",
    "Overall, sharding data into separate files and sending them to separate training nodes will run faster, but may produce lower accuracy than a model that replicates the data across all nodes.  Naturally, this can be influenced by training the sharded model longer, with more epochs.  And it should be noted that we trained with a very small number of epochs to highlight this difference.\n",
    "\n",
    "Different algorithms can be expected to show variation in which distribution mechanism is most effective at achieving optimal compute spend per point of model accuracy.  The message remains the same though, that the process of finding the right distribution type is another experiment in optimizing model training times."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### (Optional) Clean-up\n",
    "\n",
    "If you're ready to be done with this notebook, please run the cell below.  This will remove the hosted endpoints you created and avoid any charges from a stray instance being left on."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "sm.delete_endpoint(EndpointName=sharded_endpoint)\n",
    "sm.delete_endpoint(EndpointName=replicated_endpoint)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Environment (conda_python3)",
   "language": "python",
   "name": "conda_python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.3"
  },
  "notice": "Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.  Licensed under the Apache License, Version 2.0 (the \"License\"). You may not use this file except in compliance with the License. A copy of the License is located at http://aws.amazon.com/apache2.0/ or in the \"license\" file accompanying this file. This file is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License."
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
